package com.atuguigu.flink.Day08;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Random;

//解决数据倾斜
public class Example1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        env
                .fromElements(
                        Tuple3.of("a", 1L, 1000L),
                        Tuple3.of("a", 1L, 2000L),
                        Tuple3.of("a", 1L, 3000L),
                        Tuple3.of("a", 1L, 4000L),
                        Tuple3.of("a", 1L, 5000L),
                        Tuple3.of("a", 1L, 6000L),
                        Tuple3.of("a", 1L, 7000L),
                        Tuple3.of("a", 1L, 8000L),
                        Tuple3.of("a", 1L, 9000L),
                        Tuple3.of("a", 1L, 10000L),
                        Tuple3.of("b", 1L, 11000L)
                )//改变key的值
        .map(
                new MapFunction<Tuple3<String, Long, Long>, Tuple3<String,Long,Long>>() {
                    @Override
                    public Tuple3<String, Long, Long> map(Tuple3<String, Long, Long> value) throws Exception {
                        Random random = new Random();
                        return Tuple3.of(value.f0 + "-" +random.nextInt(4),value.f1,value.f2);
                    }
                }
        )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Long, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Long, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Long, Long> element, long recordTimestamp) {
                                return element.f2;
                            }
                        })
                )
                .keyBy(r->r.f0)
                .process(new KeyedProcessFunction<String, Tuple3<String, Long, Long>, Tuple2<String,Long>>() {
                    //将相同的key进行累加，2个状态，一个sum，一个保存时间
                    private ValueState<Long> sum;
                    private ValueState<Long> timeTs;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        sum=getRuntimeContext().getState(new ValueStateDescriptor<Long>("sum", Types.LONG));
                        timeTs=getRuntimeContext().getState(new ValueStateDescriptor<Long>("timets", Types.LONG));
                    }



                    @Override
                    public void processElement(Tuple3<String, Long, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                        //如果是第一次进来的key
                        if(sum.value() == null){
                            sum.update(value.f1);
                            ctx.timerService().registerEventTimeTimer(value.f2 + 10 * 1000L);
                            timeTs.update(value.f2 + 10 * 1000L);
                        }else {
                            //进来的不是第一进来的key，则进行累加
                            sum.update(value.f1 + sum.value());
                            if(timeTs.value() == null){
                                ctx.timerService().registerEventTimeTimer(value.f2 + 10 * 1000L);
                                timeTs.update(value.f2 + 10 * 1000L);
                            }
                        }




                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        out.collect(Tuple2.of(ctx.getCurrentKey(),sum.value()));
                        timeTs.clear();
                    }

                })
                .map(new MapFunction<Tuple2<String, Long>, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {

                        return Tuple2.of(value.f0.split("-")[0],value.f1);
                    }
                })
             .keyBy(r->r.f0)
                .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String,Long>>() {
                    private MapState<String,Long> mapstate;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        mapstate=getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("sum" ,Types.STRING,Types.LONG));
                    }

                    @Override
                    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                        mapstate.put(value.f0,value.f1);
                        long sum=0L;
                        for(Long v:mapstate.values()){
                            sum+=v;
                        }
                        out.collect(Tuple2.of(value.f0,sum));

                    }
                }).print();













        env.execute();
    }
}
